package drds.plus.executor.transaction.strict;


import drds.plus.datanode.api.Connection;
import drds.plus.datanode.api.DatasourceManager;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.transaction.AbstractTransaction;
import drds.plus.executor.transaction.strict_write_with_non_transaction_cross_database_read.ReadWrite;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.util.Set;

/**
 * 建立连接则开启事务,只会保留一个连接
 */
@Slf4j
public class StrictTransaction extends AbstractTransaction {

    /**
     * 当前进行事务的节点
     */
    protected String transactionDataNodeId = null;

    private drds.plus.executor.transaction.ConnectionHolder connectionHolder;

    public StrictTransaction(ExecuteContext executeContext) {
        super(executeContext);
        this.connectionHolder = new StrictConnectionHolder();
    }

    public Connection getConnection(@NonNull String dataNodeId, DatasourceManager datasourceManager, ReadWrite readWrite) throws SQLException {
        lock.lock();
        try {
            Connection connection = null;
            if (transactionDataNodeId != null) {
                // 已经有事务链接了
                if (transactionDataNodeId.equalsIgnoreCase(dataNodeId) || ExecutePlan.USE_LAST_DATA_NODE.equals(dataNodeId)) {
                    connection = this.getConnectionHolder().getConnection(transactionDataNodeId, datasourceManager);//会拿到原来开启事务的连接
                    return connection;
                } else {
                    throw new IllegalStateException(transactionDataNodeId + dataNodeId);
                }

            } else {
                // 没有事务建立，新建事务
                transactionDataNodeId = dataNodeId;
                connection = this.getConnectionHolder().getConnection(transactionDataNodeId, datasourceManager);
                connection.setAutoCommit(false);
                return connection;
            }
        } finally {
            lock.unlock();
        }
    }

    public void commit() throws RuntimeException {

        lock.lock();

        try {
            Set<Connection> connectionSet = this.getConnectionHolder().getConnectionSet();

            for (Connection connection : connectionSet) {
                try {
                    connection.commit();
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }

            close();
        } finally {
            lock.unlock();
        }

    }

    public void rollback() throws RuntimeException {
        lock.lock();

        try {

            Set<Connection> connectionSet = this.getConnectionHolder().getConnectionSet();

            for (Connection connection : connectionSet) {
                try {
                    connection.rollback();
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }

            close();
        } finally {
            lock.unlock();
        }
    }

    public drds.plus.executor.transaction.ConnectionHolder getConnectionHolder() {
        return connectionHolder;
    }

    public void tryClose(String dataNodeId, Connection connection) throws SQLException {
        lock.lock();

        try {
            this.getConnectionHolder().tryClose(dataNodeId, connection);
        } finally {
            lock.unlock();
        }

    }

    public void close() {

        if (isClosed()) {
            return;
        }

        lock.lock();

        try {
            super.close();

            this.getConnectionHolder().closeConnectionSet();

        } finally {
            lock.unlock();
        }
    }

    public void tryClose() throws SQLException {

    }

}
